1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.report.ConsoleLogger;
23 import org.junit.runner.Description;
24 import org.junit.runners.model.RunnerScheduler;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.PrintStream;
28 import java.util.Collection;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.CopyOnWriteArraySet;
32 import java.util.concurrent.RejectedExecutionException;
33 import java.util.concurrent.RejectedExecutionHandler;
34 import java.util.concurrent.ThreadPoolExecutor;
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class Scheduler
49 implements RunnerScheduler
50 {
51 private final Balancer balancer;
52
53 private final SchedulingStrategy strategy;
54
55 private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
56
57 private final Description description;
58
59 private final ConsoleLogger logger;
60
61 private volatile boolean shutdown = false;
62
63 private volatile boolean started = false;
64
65 private volatile boolean finished = false;
66
67 private volatile Controller masterController;
68
69
70
71
72
73
74
75 public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy )
76 {
77 this( logger, description, strategy, -1 );
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy, int concurrency )
94 {
95 this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy, Balancer balancer )
114 {
115 strategy.setDefaultShutdownHandler( newShutdownHandler() );
116 this.logger = logger;
117 this.description = description;
118 this.strategy = strategy;
119 this.balancer = balancer;
120 masterController = null;
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134 public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
135 SchedulingStrategy strategy, Balancer balancer )
136 {
137 this( logger, description, strategy, balancer );
138 strategy.setDefaultShutdownHandler( newShutdownHandler() );
139 masterScheduler.register( this );
140 }
141
142
143
144
145
146
147
148
149 public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
150 SchedulingStrategy strategy, int concurrency )
151 {
152 this( logger, description, strategy, concurrency );
153 strategy.setDefaultShutdownHandler( newShutdownHandler() );
154 masterScheduler.register( this );
155 }
156
157
158
159
160
161
162
163 public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
164 SchedulingStrategy strategy )
165 {
166 this( logger, description, masterScheduler, strategy, 0 );
167 }
168
169 private void setController( Controller masterController )
170 {
171 if ( masterController == null )
172 {
173 throw new NullPointerException( "null ExecutionController" );
174 }
175 this.masterController = masterController;
176 }
177
178
179
180
181
182 private boolean register( Scheduler slave )
183 {
184 boolean canRegister = slave != null && slave != this;
185 if ( canRegister )
186 {
187 Controller controller = new Controller( slave );
188 canRegister = !slaves.contains( controller );
189 if ( canRegister )
190 {
191 slaves.add( controller );
192 slave.setController( controller );
193 }
194 }
195 return canRegister;
196 }
197
198
199
200
201 private boolean canSchedule()
202 {
203 return !shutdown && ( masterController == null || masterController.canSchedule() );
204 }
205
206 protected void logQuietly( Throwable t )
207 {
208 ByteArrayOutputStream out = new ByteArrayOutputStream();
209 PrintStream stream = new PrintStream( out );
210 t.printStackTrace( stream );
211 stream.close();
212 logger.info( out.toString() );
213 }
214
215 protected void logQuietly( String msg )
216 {
217 logger.info( msg );
218 }
219
220
221
222
223
224
225
226
227
228
229
230 protected ShutdownResult describeStopped( boolean stopNow )
231 {
232 Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
233 Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
234 stop( executedTests, incompleteTests, false, stopNow );
235 return new ShutdownResult( executedTests, incompleteTests );
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
255 boolean tryCancelFutures, boolean stopNow )
256 {
257 shutdown = true;
258 try
259 {
260 if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
261 {
262 if ( executedTests != null )
263 {
264 executedTests.add( description );
265 }
266
267 if ( incompleteTests != null && !finished )
268 {
269 incompleteTests.add( description );
270 }
271 }
272
273 for ( Controller slave : slaves )
274 {
275 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
276 }
277 }
278 finally
279 {
280 try
281 {
282 balancer.releaseAllPermits();
283 }
284 finally
285 {
286 if ( stopNow )
287 {
288 strategy.stopNow();
289 }
290 else if ( tryCancelFutures )
291 {
292 strategy.stop();
293 }
294 else
295 {
296 strategy.disable();
297 }
298 }
299 }
300 }
301
302 protected boolean shutdownThreadPoolsAwaitingKilled()
303 {
304 if ( masterController == null )
305 {
306 stop( null, null, true, false );
307 boolean isNotInterrupted = true;
308 if ( strategy != null )
309 {
310 isNotInterrupted = strategy.destroy();
311 }
312 for ( Controller slave : slaves )
313 {
314 isNotInterrupted &= slave.destroy();
315 }
316 return isNotInterrupted;
317 }
318 else
319 {
320 throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
321 }
322 }
323
324 protected void beforeExecute()
325 {
326 }
327
328 protected void afterExecute()
329 {
330 }
331
332 public void schedule( Runnable childStatement )
333 {
334 if ( childStatement == null )
335 {
336 logQuietly( "cannot schedule null" );
337 }
338 else if ( canSchedule() && strategy.canSchedule() )
339 {
340 try
341 {
342 boolean isNotInterrupted = balancer.acquirePermit();
343 if ( isNotInterrupted && !shutdown )
344 {
345 Runnable task = wrapTask( childStatement );
346 strategy.schedule( task );
347 started = true;
348 }
349 }
350 catch ( RejectedExecutionException e )
351 {
352 stop( null, null, true, false );
353 }
354 catch ( Throwable t )
355 {
356 balancer.releasePermit();
357 logQuietly( t );
358 }
359 }
360 }
361
362 public void finished()
363 {
364 try
365 {
366 strategy.finished();
367 }
368 catch ( InterruptedException e )
369 {
370 logQuietly( e );
371 }
372 finally
373 {
374 finished = true;
375 }
376 }
377
378 private Runnable wrapTask( final Runnable task )
379 {
380 return new Runnable()
381 {
382 public void run()
383 {
384 try
385 {
386 beforeExecute();
387 task.run();
388 }
389 finally
390 {
391 try
392 {
393 afterExecute();
394 }
395 finally
396 {
397 balancer.releasePermit();
398 }
399 }
400 }
401 };
402 }
403
404 protected ShutdownHandler newShutdownHandler()
405 {
406 return new ShutdownHandler();
407 }
408
409
410
411
412 private final class Controller
413 {
414 private final Scheduler slave;
415
416 private Controller( Scheduler slave )
417 {
418 this.slave = slave;
419 }
420
421
422
423
424 boolean canSchedule()
425 {
426 return Scheduler.this.canSchedule();
427 }
428
429 void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
430 boolean tryCancelFutures, boolean shutdownNow )
431 {
432 slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
433 }
434
435
436
437
438 boolean destroy()
439 {
440 return slave.strategy.destroy();
441 }
442
443 @Override
444 public int hashCode()
445 {
446 return slave.hashCode();
447 }
448
449 @Override
450 public boolean equals( Object o )
451 {
452 return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
453 }
454 }
455
456
457
458
459
460
461
462
463
464 public class ShutdownHandler
465 implements RejectedExecutionHandler
466 {
467 private volatile RejectedExecutionHandler poolHandler;
468
469 protected ShutdownHandler()
470 {
471 poolHandler = null;
472 }
473
474 public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
475 {
476 this.poolHandler = poolHandler;
477 }
478
479 public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
480 {
481 if ( executor.isShutdown() )
482 {
483 Scheduler.this.stop( null, null, true, false );
484 }
485 final RejectedExecutionHandler poolHandler = this.poolHandler;
486 if ( poolHandler != null )
487 {
488 poolHandler.rejectedExecution( r, executor );
489 }
490 }
491 }
492 }